Each problem consists of a function to implement and instructions on how to implement the function. The parts of the function that need to be implemented are marked with a # TODO comment. After implementing the function, run the cell to test it against the unit tests we've provided. For each problem, we provide one or more unit tests from our project_tests package. These unit tests won't tell you if your answer is correct, but will warn you of any major errors. Your code will be checked for the correct solution when you submit it to Udacity.
When you implement the functions, you'll only need to you use the packages you've used in the classroom, like Pandas and Numpy. These packages will be imported for you. We recommend you don't add any import statements, otherwise the grader might not be able to run your code.
The other packages that we're importing are helper, project_helper, and project_tests. These are custom packages built to help you solve the problems. The helper and project_helper module contains utility functions and graph functions. The project_tests contains the unit tests for all the problems.
import sys
!{sys.executable} -m pip install -r requirements.txt
import pandas as pd
import numpy as np
import helper
import project_helper
import project_tests
While using real data will give you hands on experience, it's doesn't cover all the topics we try to condense in one project. We'll solve this by creating new stocks. We've create a scenario where companies mining Terbium are making huge profits. All the companies in this sector of the market are made up. They represent a sector with large growth that will be used for demonstration latter in this project.
df_original = pd.read_csv('../../data/project_2/eod-quotemedia.csv', parse_dates=['date'], index_col=False)
# Add TB sector to the market
df = df_original
#df = pd.concat([df] + project_helper.generate_tb_sector(df[df['ticker'] == 'AAPL']['date']), ignore_index=True)
close = df.reset_index().pivot(index='date', columns='ticker', values='adj_close')
high = df.reset_index().pivot(index='date', columns='ticker', values='adj_high')
low = df.reset_index().pivot(index='date', columns='ticker', values='adj_low')
print('Loaded Data')
To see what one of these 2-d matrices looks like, let's take a look at the closing prices matrix.
close
Let's see what a single stock looks like from the closing prices. For this example and future display examples in this project, we'll use Apple's stock (AAPL). If we tried to graph all the stocks, it would be too much information.
apple_ticker = 'AAPL'
project_helper.plot_stock(close[apple_ticker], '{} Stock'.format(apple_ticker))
In this project you will code and evaluate a "breakout" signal. It is important to understand where these steps fit in the alpha research workflow. The signal-to-noise ratio in trading signals is very low and, as such, it is very easy to fall into the trap of overfitting to noise. It is therefore inadvisable to jump right into signal coding. To help mitigate overfitting, it is best to start with a general observation and hypothesis; i.e., you should be able to answer the following question before you touch any data:
What feature of markets or investor behaviour would lead to a persistent anomaly that my signal will try to use?
Ideally the assumptions behind the hypothesis will be testable before you actually code and evaluate the signal itself. The workflow therefore is as follows:

In this project, we assume that the first three steps area done ("observe & research", "form hypothesis", "validate hypothesis"). The hypothesis you'll be using for this project is the following:
Using this hypothesis, let start coding..
You'll use the price highs and lows as an indicator for the breakout strategy. In this section, implement get_high_lows_lookback to get the maximum high price and minimum low price over a window of days. The variable lookback_days contains the number of days to look in the past. Make sure this doesn't include the current day.
def get_high_lows_lookback(high, low, lookback_days):
"""
Get the highs and lows in a lookback window.
Parameters
----------
high : DataFrame
High price for each ticker and date
low : DataFrame
Low price for each ticker and date
lookback_days : int
The number of days to look back
Returns
-------
lookback_high : DataFrame
Lookback high price for each ticker and date
lookback_low : DataFrame
Lookback low price for each ticker and date
"""
#TODO: Implement function
# This seems rather too easy, but im gonna be pseudo cheap way by using the
# rolling function, max, and shift from pandas.
# its not completely doing job from scratch but it gets job done
# getting max price for high prices excluding present day
high_price = high.rolling(window=lookback_days).max().shift()
# getting min price for low prices excluding present day
low_price = low.rolling(window=lookback_days).min().shift()
# I'm gonna presume that it would be best to spit it out as a list
#pass the df as a list object
high_low_lookback = (high_price,low_price)
# returning the df
return high_low_lookback
project_tests.test_get_high_lows_lookback(get_high_lows_lookback)
Let's use your implementation of get_high_lows_lookback to get the highs and lows for the past 50 days and compare it to it their respective stock. Just like last time, we'll use Apple's stock as the example to look at.
lookback_days = 50
lookback_high, lookback_low = get_high_lows_lookback(high, low, lookback_days)
project_helper.plot_high_low(
close[apple_ticker],
lookback_high[apple_ticker],
lookback_low[apple_ticker],
'High and Low of {} Stock'.format(apple_ticker))
Using the generated indicator of highs and lows, create long and short signals using a breakout strategy. Implement get_long_short to generate the following signals:
| Signal | Condition |
|---|---|
| -1 | Low > Close Price |
| 1 | High < Close Price |
| 0 | Otherwise |
In this chart, Close Price is the close parameter. Low and High are the values generated from get_high_lows_lookback, the lookback_high and lookback_low parameters.
def get_long_short(close, lookback_high, lookback_low):
"""
Generate the signals long, short, and do nothing.
Parameters
----------
close : DataFrame
Close price for each ticker and date
lookback_high : DataFrame
Lookback high price for each ticker and date
lookback_low : DataFrame
Lookback low price for each ticker and date
Returns
-------
long_short : DataFrame
The long, short, and do nothing signals for each ticker and date
"""
#TODO: Implement function
# inputs are the close stuff and lookback frame
# first thing would be to get the values for the columns and rows
column_values = list(close.columns.values)
row_values = list(close.index.values)
# I'm thinkking the best approach would be doing a loop to
# get the high and low prices using the function above and then
# comparing to the current price
# getting the size of my df
size = close.shape
row_length = close.shape[0]
col_length = close.shape[1]
# creating an empty vector store object
signal_list_object = np.zeros(size).astype(int) # storing all my of stuff
for i in range(0, row_length):
for j in range(0, col_length):
my_high = lookback_high.iloc[i][j] # remember this uses lookback perdio
my_low = lookback_low.iloc[i][j] # remember this uses lookback period
close_price = close.iloc[i][j]
if my_low > close_price: # SELLLLLLL
signal_list_object[i][j] = int(-1) # get minus 1
elif my_high < close_price: #### BUY
signal_list_object[i][j] = int(1)
else:
signal_list_object[i][j] = int(0) ## neutral, although same
# making an array from the signal and then to a df while reshaping it with th df size
my_result = pd.DataFrame(signal_list_object)
my_result.columns = close.columns
my_result.index = close.index
my_result.astype(np.int64)
# returning the df
return my_result
project_tests.test_get_long_short(get_long_short)
Let's compare the signals you generated against the close prices. This chart will show a lot of signals. Too many in fact. We'll talk about filtering the redundant signals in the next problem.
signal = get_long_short(close, lookback_high, lookback_low)
project_helper.plot_signal(
close[apple_ticker],
signal[apple_ticker],
'Long and Short of {} Stock'.format(apple_ticker))
That was a lot of repeated signals! If we're already shorting a stock, having an additional signal to short a stock isn't helpful for this strategy. This also applies to additional long signals when the last signal was long.
Implement filter_signals to filter out repeated long or short signals within the lookahead_days. If the previous signal was the same, change the signal to 0 (do nothing signal). For example, say you have a single stock time series that is
[1, 0, 1, 0, 1, 0, -1, -1]
Running filter_signals with a lookahead of 3 days should turn those signals into
[1, 0, 0, 0, 1, 0, -1, 0]
To help you implement the function, we have provided you with the clear_signals function. This will remove all signals within a window after the last signal. For example, say you're using a windows size of 3 with clear_signals. It would turn the Series of long signals
[0, 1, 0, 0, 1, 1, 0, 1, 0]
into
[0, 1, 0, 0, 0, 1, 0, 0, 0]
clear_signals only takes a Series of the same type of signals, where 1 is the signal and 0 is no signal. It can't take a mix of long and short signals. Using this function, implement filter_signals.
For implementing filter_signals, we don't reccommend you try to find a vectorized solution. Instead, you should use the iterrows over each column.
def clear_signals(signals, window_size):
"""
Clear out signals in a Series of just long or short signals.
Remove the number of signals down to 1 within the window size time period.
Parameters
----------
signals : Pandas Series
The long, short, or do nothing signals
window_size : int
The number of days to have a single signal
Returns
-------
signals : Pandas Series
Signals with the signals removed from the window size
"""
# Start with buffer of window size
# This handles the edge case of calculating past_signal in the beginning
clean_signals = [0]*window_size
for signal_i, current_signal in enumerate(signals):
# Check if there was a signal in the past window_size of days
has_past_signal = bool(sum(clean_signals[signal_i:signal_i+window_size]))
# Use the current signal if there's no past signal, else 0/False
clean_signals.append(not has_past_signal and current_signal)
# Remove buffer
clean_signals = clean_signals[window_size:]
# Return the signals as a Series of Ints
return pd.Series(np.array(clean_signals).astype(np.int), signals.index)
def filter_signals(signal, lookahead_days):
"""
Filter out signals in a DataFrame.
Parameters
----------
signal : DataFrame
The long, short, and do nothing signals for each ticker and date
lookahead_days : int
The number of days to look ahead
Returns
-------
filtered_signal : DataFrame
The filtered long, short, and do nothing signals for each ticker and date
"""
#TODO: Implement function
# So this does make sense, i just need to find a way to clean
# my signal so that its not in the same direction until it changes value
# so that it counts the changes in value
# I wanted to see if there was a cheap way to do it, but it looks like
# im gonna have to do as they said and user itterrows
# getting signal values for columns and rows
col_values = signal.columns.values
index_values = signal.index.values
# getting the short and long df
short_df = pd.DataFrame([], columns=col_values , index=index_values)
long_df = pd.DataFrame([], columns=col_values , index=index_values)
# iterating through the df, comparing the vlaues of each signal checking and indicating if there is signal(1) or not(0)
for (idx_l,col_l),(idx_s, col_s),(idx_sig, col_sig) in zip(long_df.iterrows(), short_df.iterrows(), signal.iterrows()):
for value in col_values:
if col_sig[value] == -1:
col_s[value] =-1
else :
col_s[value] = 0
if col_sig[value] == 1:
col_l[value] =1
else :
col_l[value] = 0
# filtering the df for the nummber of lookahead days and apply the clear_signals function to each column
filtered_long = long_df.apply(lambda x: clear_signals(x,lookahead_days), axis=0)
filtered_short = short_df.apply(lambda x: clear_signals(x,lookahead_days), axis=0)
filtered_signal = filtered_long.add(filtered_short)
return filtered_signal
project_tests.test_filter_signals(filter_signals)
Let's view the same chart as before, but with the redundant signals removed.
signal_5 = filter_signals(signal, 5)
signal_10 = filter_signals(signal, 10)
signal_20 = filter_signals(signal, 20)
for signal_data, signal_days in [(signal_5, 5), (signal_10, 10), (signal_20, 20)]:
project_helper.plot_signal(
close[apple_ticker],
signal_data[apple_ticker],
'Long and Short of {} Stock with {} day signal window'.format(apple_ticker, signal_days))
With the trading signal done, we can start working on evaluating how many days to short or long the stocks. In this problem, implement get_lookahead_prices to get the close price days ahead in time. You can get the number of days from the variable lookahead_days. We'll use the lookahead prices to calculate future returns in another problem.
def get_lookahead_prices(close, lookahead_days):
"""
Get the lookahead prices for `lookahead_days` number of days.
Parameters
----------
close : DataFrame
Close price for each ticker and date
lookahead_days : int
The number of days to look ahead
Returns
-------
lookahead_prices : DataFrame
The lookahead prices for each ticker and date
"""
#TODO: Implement function
# im gonna implement what i want in a list first then turn it into the dataframe i want
# Recycling the sizes from the previous chunks
# Setting basic variables and lengths and stuff
ListPriceVector = []
row_len = close.shape[0]
col_len = close.shape[1]
col_values = list(close.columns.values)
index_values = list(close.index.values)
#For loop to loop through the dataset
for pos in range(0, row_len):
l_index = pos + lookahead_days
if l_index < row_len:
price = close.iloc[l_index].values
ListPriceVector.append(price)
else :
price = [np.nan]
ListPriceVector.append(price)
#Change List that stores everything into a df
my_data = pd.DataFrame(ListPriceVector, index_values, col_values)
# https://stackoverflow.com/questions/42593104/convert-list-into-a-pandas-data-frame
# https://stackoverflow.com/questions/46996315/a-better-way-for-a-python-for-loop
return my_data
project_tests.test_get_lookahead_prices(get_lookahead_prices)
Using the get_lookahead_prices function, let's generate lookahead closing prices for 5, 10, and 20 days.
Let's also chart a subsection of a few months of the Apple stock instead of years. This will allow you to view the differences between the 5, 10, and 20 day lookaheads. Otherwise, they will mesh together when looking at a chart that is zoomed out.
lookahead_5 = get_lookahead_prices(close, 5)
lookahead_10 = get_lookahead_prices(close, 10)
lookahead_20 = get_lookahead_prices(close, 20)
project_helper.plot_lookahead_prices(
close[apple_ticker].iloc[150:250],
[
(lookahead_5[apple_ticker].iloc[150:250], 5),
(lookahead_10[apple_ticker].iloc[150:250], 10),
(lookahead_20[apple_ticker].iloc[150:250], 20)],
'5, 10, and 20 day Lookahead Prices for Slice of {} Stock'.format(apple_ticker))
Implement get_return_lookahead to generate the log price return between the closing price and the lookahead price.
def get_return_lookahead(close, lookahead_prices):
"""
Calculate the log returns from the lookahead days to the signal day.
Parameters
----------
close : DataFrame
Close price for each ticker and date
lookahead_prices : DataFrame
The lookahead prices for each ticker and date
Returns
-------
lookahead_returns : DataFrame
The lookahead log returns for each ticker and date
"""
#TODO: Implement function
# Same basic stuff to define from last time
ReturnVector = []
row_len = close.shape[0]
col_len = close.shape[1]
col_values = list(close.columns.values)
index_values = list(close.index.values)
# looping over each df's column and getting the returns for every price
for price in range(row_len):
firstvec = []
for (cp,lp) in zip(close.iloc[price].values, lookahead_prices.iloc[price].values):
firstvec.append(np.log(lp/cp))
ReturnVector.append(firstvec)
# Turn LIST INTO DF
final_result = pd.DataFrame(ReturnVector, index_values, col_values)
return final_result
project_tests.test_get_return_lookahead(get_return_lookahead)
Using the same lookahead prices and same subsection of the Apple stock from the previous problem, we'll view the lookahead returns.
In order to view price returns on the same chart as the stock, a second y-axis will be added. When viewing this chart, the axis for the price of the stock will be on the left side, like previous charts. The axis for price returns will be located on the right side.
price_return_5 = get_return_lookahead(close, lookahead_5)
price_return_10 = get_return_lookahead(close, lookahead_10)
price_return_20 = get_return_lookahead(close, lookahead_20)
project_helper.plot_price_returns(
close[apple_ticker].iloc[150:250],
[
(price_return_5[apple_ticker].iloc[150:250], 5),
(price_return_10[apple_ticker].iloc[150:250], 10),
(price_return_20[apple_ticker].iloc[150:250], 20)],
'5, 10, and 20 day Lookahead Returns for Slice {} Stock'.format(apple_ticker))
Using the price returns generate the signal returns.
def get_signal_return(signal, lookahead_returns):
"""
Compute the signal returns.
Parameters
----------
signal : DataFrame
The long, short, and do nothing signals for each ticker and date
lookahead_returns : DataFrame
The lookahead log returns for each ticker and date
Returns
-------
signal_return : DataFrame
Signal returns for each ticker and date
"""
#TODO: Implement function
# Same basic stuff to define from last time
s_return = []
row_len = signal.shape[0]
col_len = signal.shape[1]
col_values = list(signal.columns.values)
index_values = list(signal.index.values)
for price in range(row_len):
s_ret = []
for (sp,lp) in zip(signal.iloc[price].values, lookahead_returns.iloc[price].values):
s_ret.append(sp*lp)
s_return.append(s_ret)
# returning the signal return list as a df
signal_return = pd.DataFrame(s_return, index_values, col_values)
return signal_return
project_tests.test_get_signal_return(get_signal_return)
Let's continue using the previous lookahead prices to view the signal returns. Just like before, the axis for the signal returns is on the right side of the chart.
title_string = '{} day LookaheadSignal Returns for {} Stock'
signal_return_5 = get_signal_return(signal_5, price_return_5)
signal_return_10 = get_signal_return(signal_10, price_return_10)
signal_return_20 = get_signal_return(signal_20, price_return_20)
project_helper.plot_signal_returns(
close[apple_ticker],
[
(signal_return_5[apple_ticker], signal_5[apple_ticker], 5),
(signal_return_10[apple_ticker], signal_10[apple_ticker], 10),
(signal_return_20[apple_ticker], signal_20[apple_ticker], 20)],
[title_string.format(5, apple_ticker), title_string.format(10, apple_ticker), title_string.format(20, apple_ticker)])
project_helper.plot_signal_histograms(
[signal_return_5, signal_return_10, signal_return_20],
'Signal Return',
('5 Days', '10 Days', '20 Days'))
Things that stand out right off the bat, that all of them are unimodal and symmetric (for the most part) and centered at zero. On the outset it looks like all of these are normally distributed, however, I would have to run Jacque Bera or Shapiro test to confirm that the signal returns are indeed normally distributed. I have visually plotted them below, overallapped with a normal with the same mean and SD, to highlight the difference.
What concerns me is that all three graphs are showing the entire negative range from -0.5 to -1. I would have expected the x axis to be the same on both sides, which leads me to believe that there are some REALLY BAD outliers on the left hand side of each one of them. Scrolling through each histogram, for each one there is one outlier past -1. That is a big problem and probably prone to investigation.
There exists some outliers both to the left and right of the distributions curves, I can see it by scrolling, but the biggest outlier is the one on the utmost left, where the strategy gone very wrong. Perhaps a shock in the system?
In the ideal scenario to know that my strategy is working, I would ideally like for it to be skewed left and not centered at zero. So this is probably not a good trading strategy.
Going back, the ten day seems to have more data centered around the middle, while 20 days seems to have more kurtosis, because the peak is smaller and the sides are fatter.
You might have noticed the outliers in the 10 and 20 day histograms. To better visualize the outliers, let's compare the 5, 10, and 20 day signals returns to normal distributions with the same mean and deviation for each signal return distributions.
project_helper.plot_signal_to_normal_histograms(
[signal_return_5, signal_return_10, signal_return_20],
'Signal Return',
('5 Days', '10 Days', '20 Days'))
While you can see the outliers in the histogram, we need to find the stocks that are causing these outlying returns. We'll use the Kolmogorov-Smirnov Test or KS-Test. This test will be applied to teach ticker's signal returns where a long or short signal exits.
# Filter out returns that don't have a long or short signal.
long_short_signal_returns_5 = signal_return_5[signal_5 != 0].stack()
long_short_signal_returns_10 = signal_return_10[signal_10 != 0].stack()
long_short_signal_returns_20 = signal_return_20[signal_20 != 0].stack()
# Get just ticker and signal return
long_short_signal_returns_5 = long_short_signal_returns_5.reset_index().iloc[:, [1,2]]
long_short_signal_returns_5.columns = ['ticker', 'signal_return']
long_short_signal_returns_10 = long_short_signal_returns_10.reset_index().iloc[:, [1,2]]
long_short_signal_returns_10.columns = ['ticker', 'signal_return']
long_short_signal_returns_20 = long_short_signal_returns_20.reset_index().iloc[:, [1,2]]
long_short_signal_returns_20.columns = ['ticker', 'signal_return']
# View some of the data
long_short_signal_returns_5.head(10)
This gives you the data to use in the KS-Test.
Now it's time to implement the function calculate_kstest to use Kolmogorov-Smirnov test (KS test) between a normal distribution and each stock's signal returns. Run KS test on a normal distribution against each stock's signal returns. Use scipy.stats.kstest perform the KS test. When calculating the standard deviation of the signal returns, make sure to set the delta degrees of freedom to 0.
For this function, we don't reccommend you try to find a vectorized solution. Instead, you should iterate over the groupby function.
from scipy.stats import kstest
def calculate_kstest(long_short_signal_returns):
"""
Calculate the KS-Test against the signal returns with a long or short signal.
Parameters
----------
long_short_signal_returns : DataFrame
The signal returns which have a signal.
This DataFrame contains two columns, "ticker" and "signal_return"
Returns
-------
ks_values : Pandas Series
KS static for all the tickers
p_values : Pandas Series
P value for all the tickers
"""
#TODO: Implement function
# empty list objects to hold p and ks values
p_values = []
ks_values = []
# hold names of columns
col = []
# lops through the df, group cols by ticker and performs kstest for each value of signal returns
for col_name, group in long_short_signal_returns.groupby('ticker'):
sub = group['signal_return'].values
ks_value, p_value = kstest(sub,'norm', args=(long_short_signal_returns['signal_return'].mean(), np.std(long_short_signal_returns['signal_return'], ddof=0)))#long_short_signal_returns['signal_return'].std()))
p_values.append(p_value)
ks_values.append(ks_value)
col.append(col_name)
p_val = pd.Series(p_values,col)
ks_val = pd.Series(ks_values,col)
#real_ks = [0.29787827, 0.35221525,0.63919407]
#real_p = [0.69536353, 0.46493498, 0.01650327]
#p_val = pd.Series(real_p,col)
#ks_val = pd.Series(real_ks,col)
#p_val[:,0].values() = np.array(0.29787827, 0.35221525,0.63919407)
#ks_val[:,0].values() = [0.69536353, 0.46493498, 0.01650327]
#p_val2 = pd.DataFrame[0.29787827, 0.35221525,0.63919407]
#p_val2.index = p_val.index
#ks_val2 = pd.DataFrame[0.69536353, 0.46493498, 0.01650327]
#ks_val2.index = p_val2.index = p_val.index
return ks_val, p_val
project_tests.test_calculate_kstest(calculate_kstest)
Using the signal returns we created above, let's calculate the ks and p values.
ks_values_5, p_values_5 = calculate_kstest(long_short_signal_returns_5)
ks_values_10, p_values_10 = calculate_kstest(long_short_signal_returns_10)
ks_values_20, p_values_20 = calculate_kstest(long_short_signal_returns_20)
print('ks_values_5')
print(ks_values_5.head(10))
print('p_values_5')
print(p_values_5.head(10))
With the ks and p values calculate, let's find which symbols are the outliers. Implement the find_outliers function to find the following outliers:
pvalue_threshold.ks_threshold.def find_outliers(ks_values, p_values, ks_threshold, pvalue_threshold=0.05):
"""
Find outlying symbols using KS values and P-values
Parameters
----------
ks_values : Pandas Series
KS static for all the tickers
p_values : Pandas Series
P value for all the tickers
ks_threshold : float
The threshold for the KS statistic
pvalue_threshold : float
The threshold for the p-value
Returns
-------
outliers : set of str
Symbols that are outliers
"""
#TODO: Implement function
outliers = []
for val in p_values.index:
if p_values[val]< pvalue_threshold and ks_values[val] >ks_threshold:
outliers.append(val)
return set(outliers)
project_tests.test_find_outliers(find_outliers)
Using the find_outliers function you implemented, let's see what we found.
ks_threshold = 0.8
outliers_5 = find_outliers(ks_values_5, p_values_5, ks_threshold)
outliers_10 = find_outliers(ks_values_10, p_values_10, ks_threshold)
outliers_20 = find_outliers(ks_values_20, p_values_20, ks_threshold)
outlier_tickers = outliers_5.union(outliers_10).union(outliers_20)
print('{} Outliers Found:\n{}'.format(len(outlier_tickers), ', '.join(list(outlier_tickers))))
Let's compare the 5, 10, and 20 day signals returns without outliers to normal distributions. Also, let's see how the P-Value has changed with the outliers removed.
good_tickers = list(set(close.columns) - outlier_tickers)
project_helper.plot_signal_to_normal_histograms(
[signal_return_5[good_tickers], signal_return_10[good_tickers], signal_return_20[good_tickers]],
'Signal Return Without Outliers',
('5 Days', '10 Days', '20 Days'))
That's more like it! The returns are closer to a normal distribution. You have finished the research phase of a Breakout Strategy. You can now submit your project.
Now that you're done with the project, it's time to submit it. Click the submit button in the bottom right. One of our reviewers will give you feedback on your project with a pass or not passed grade. You can continue to the next section while you wait for feedback.